package com.yeeyk.demo.handler.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.yeeyk.demo.common.Constants;
import com.yeeyk.demo.dto.ExcelRom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class MessageSpout implements IRichSpout {
	private static final Logger LOGGER = LoggerFactory.getLogger(MessageSpout.class);
	private static final long serialVersionUID = 1L;
	private AtomicInteger index = new AtomicInteger(0);
	private List<ExcelRom> data ;
	private SpoutOutputCollector collector;
	private AtomicBoolean sendComplete = new AtomicBoolean(Boolean.FALSE);
	private CountDownLatch countDownLatch ;

	
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}
	
	@Override
	public void nextTuple() {
		if(data !=null && !data.isEmpty() && index.get() < data.size()){
			ExcelRom excelRom = data.get(index.get());
			//发送信息参数1 为数值， 参数2为msgId
			collector.emit(new Values(excelRom), index);
			index.incrementAndGet();
		}else if ( data ==null || data.isEmpty() || index.get() >= data.size() )
			sendComplete.set(Boolean.TRUE);
	}
	
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields(Constants.StormConstants.SPOUT_FEILD_NAME));
	}

	@Override
	public void ack(Object msgId) {
		LOGGER.info("【消息发送成功!!!】 (msgId = " + msgId +")");
		Constants.TREATED_SUCCESS_SET.add(data.get((Integer) msgId));
	}

	@Override
	public void fail(Object msgId) {
		LOGGER.info("【消息发送失败!!!】  (msgId = " + msgId +")");
		LOGGER.info("【重发进行中...】");
		ExcelRom excelRom =data.get((Integer) msgId);
		if(excelRom.getRetry() <= Constants.StormConstants.RETRY_EMIT) {
			excelRom.setRetry( excelRom.getRetry() + 1 );
			collector.emit(new Values(excelRom), msgId);
		} else
			Constants.TREATED_FAIL_SET.add(excelRom);
		LOGGER.info("【重发成功!!!】");
		/**
		 * 是否发送完毕
		 */
		if ( sendComplete.get() )
			if (Constants.TREATED_FAIL_SET.isEmpty()
					|| Constants.TREATED_FAIL_SET.stream().allMatch((rom)->rom.getRetry() > Constants.StormConstants.RETRY_EMIT)){
				Constants.TREATED_COMPLETE.set(Boolean.TRUE);
				countDownLatch.countDown();
			}
	}

	@Override
	public void close() {

	}

	@Override
	public void activate() {

	}

	@Override
	public void deactivate() {

	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}


	public MessageSpout(List<ExcelRom> data ,CountDownLatch countDownLatch) {
		this.countDownLatch = countDownLatch;
		this.data = data;
	}
}
